Spring Async的使用&MDC继承

您所在的位置:网站首页 reactor 异步 Spring Async的使用&MDC继承

Spring Async的使用&MDC继承

2023-01-23 11:38| 来源: 网络整理| 查看: 265

项目中很多任务都可以异步完成,比如消息通知等。 可以借用Spring Async注解,可以很快的实现异步调用。另外为了方便跟踪请求日志,一般会借助MDC在日志中输出traceId,但是跨线程执行的时候的,MDC信息并不会传递,所以需要自定义线程执行器。

启用AsyncSpring Boot配置Async添加注解@EnableAsync @Slf4j @EnableAsync @SpringBootApplication public class AsyncApplication implements ApplicationRunner { @Resource private PersonManager personManager; public static void main(String[] args) { SpringApplication.run(AsyncApplication.class, args); } @Override public void run(ApplicationArguments args) throws Exception { log.info("run application"); personManager.sayHello(); } }异步接口添加注解@Async @Slf4j @Component public class PersonManager { @Async public void sayHello() { log.info("Hello World!"); } }执行结果[2019-07-19 21:23:21.823][main][INFO][AsyncApplication:33][][]: run application [2019-07-19 21:23:21.824][main][DEBUG][DefaultSingletonBeanRegistry:213][][]: Creating shared instance of singleton bean 'applicationTaskExecutor' [2019-07-19 21:23:21.824][main][DEBUG][ConstructorResolver:777][][]: Autowiring by type from bean name 'applicationTaskExecutor' via factory method to bean named 'taskExecutorBuilder' [2019-07-19 21:23:21.830][main][INFO][ExecutorConfigurationSupport:171][][]: Initializing ExecutorService 'applicationTaskExecutor' [2019-07-19 21:23:21.835][task-1][INFO][PersonManager:16][][]: Hello World!

执行日志中可以看到sayHello函数是在任务执行器applicationTaskExecutor的线程task-1执行的,不是main线程

自定义Async线程池@Bean public AsyncTaskExecutor taskExecutor() { ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor(); executor.setThreadNamePrefix("Anno-Executor"); executor.setMaxPoolSize(10); return executor; }执行结果[2019-07-19 21:25:17.952][main][INFO][AsyncApplication:33][][]: run application [2019-07-19 21:25:17.958][Anno-Executor1][INFO][PersonManager:16][][]: Hello World!自定义MDC可继承的ThreadPoolTaskExecutor

当我们在日志中使用MDC实现调用链路跟踪时(使用traceId),如果异步调用,则会丢失MDC信息。所以建议使用下面的MdcThreadPoolTaskExecutor

public class MdcThreadPoolTaskExecutor extends ThreadPoolTaskExecutor { final private boolean useFixedContext; final private Map fixedContext; /** * Pool where task threads take MDC from the submitting thread. */ public static MdcThreadPoolTaskExecutor newWithInheritedMdc(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, int queueCapacity) { return new MdcThreadPoolTaskExecutor(null, corePoolSize, maximumPoolSize, keepAliveTime, unit, queueCapacity); } private MdcThreadPoolTaskExecutor(Map fixedContext, int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, int queueCapacity) { setCorePoolSize(corePoolSize); setMaxPoolSize(maximumPoolSize); setKeepAliveSeconds((int) unit.toSeconds(keepAliveTime)); setQueueCapacity(queueCapacity); this.fixedContext = fixedContext; useFixedContext = (fixedContext != null); } private Map getContextForTask() { return useFixedContext ? fixedContext : MDC.getCopyOfContextMap(); } /** * All executions will have MDC injected. {@code ThreadPoolExecutor}'s submission methods ({@code submit()} etc.) * all delegate to this. */ @Override public void execute(@NonNull Runnable command) { super.execute(wrap(command, getContextForTask())); } @NonNull @Override public Future submit(@NonNull Runnable task) { return super.submit(wrap(task, getContextForTask())); } @NonNull @Override public Future submit(@NonNull Callable task) { return super.submit(wrap(task, getContextForTask())); } private static Callable wrap(final Callable task, final Map context) { return () -> { Map previous = MDC.getCopyOfContextMap(); if (context == null) { MDC.clear(); } else { MDC.setContextMap(context); } try { return task.call(); } finally { if (previous == null) { MDC.clear(); } else { MDC.setContextMap(previous); } } }; } private static Runnable wrap(final Runnable runnable, final Map context) { return () -> { Map previous = MDC.getCopyOfContextMap(); if (context == null) { MDC.clear(); } else { MDC.setContextMap(context); } try { runnable.run(); } finally { if (previous == null) { MDC.clear(); } else { MDC.setContextMap(previous); } } }; } }使用MdcThreadPoolTaskExecutor@Slf4j @EnableAsync @SpringBootApplication public class AsyncApplication implements ApplicationRunner { @Resource private PersonManager personManager; public static void main(String[] args) { SpringApplication.run(AsyncApplication.class, args); } @Override public void run(ApplicationArguments args) throws Exception { MDC.put("traceId", UUID.randomUUID().toString()); log.info("run application"); personManager.sayHello(); } @Bean public AsyncTaskExecutor taskExecutor() { ThreadPoolTaskExecutor executor = MdcThreadPoolTaskExecutor.newWithInheritedMdc(8, 32, 1, TimeUnit.MINUTES, 1000); executor.setThreadNamePrefix("Anno-Executor"); executor.setMaxPoolSize(10); return executor; } }执行结果[2019-07-19 21:29:58.567][main][INFO][AsyncApplication:32][][07570316-f690-44c5-adb6-dc69c097323f]: run application [2019-07-19 21:29:58.575][Anno-Executor1][INFO][PersonManager:16][][07570316-f690-44c5-adb6-dc69c097323f]: Hello World!

可以看到traceId也传递到线程Anno-Executor1了

参考自带监控&兼容MDC的线程池Spring @Async异步调用(异步线程池)


【本文地址】


今日新闻


推荐新闻


CopyRight 2018-2019 办公设备维修网 版权所有 豫ICP备15022753号-3